Step FunctionsとPandasを使ってサーバーレスETL入門
こんにちは、クラスメソッドの岡です。
今回Step Functionsを使って簡単なETL処理を試す機会があったので実際に作ったものを公開します。
サーバーレスでETL処理、といえばAWS Glueが浮かぶかと思いますが、今回はGlueは使わず、LambdaのPythonランタイムでPandasを使ってS3のデータとDynamoDBのデータを結合するような処理を行ってみたいと思います。
ちなみに私はデータ分析に関する知識はほぼ皆無ですが、PythonライブラリPandasを使う事で簡単にデータ処理を行えました。
シナリオ
今回はIoTデバイスから送られてくる時系列データがS3に出力されている前提として、そのファイルとDynamoDBにあるデバイスのマスタデータと結合して分析データとして別のS3バケットに出力する、といったシナリオを想定しています。
構成
サンプルコード
今回はServerless Frameworkを使ってデプロイします。
コードはここのリポジトリにまとめてあります。
環境
- Serverless Framework
- Pipenv
- Pyenv
- Pandas
S3バケットとDynamoDBテーブルを作成
今回は2つのスタックに分割します。
- S3 & DynamoDB
- Lambda & Step Functions
service: serverless-etl-sample-resources provider: name: aws region: ap-northeast-1 stage: ${opt:stage, self:custom.defaultStage} stackName: ${self:custom.appName}-resources custom: appName: serverless-etl-sample defaultStage: dev resources: Resources: DeviceTable: Type: "AWS::DynamoDB::Table" Properties: TableName: Devices AttributeDefinitions: - AttributeName: user_id AttributeType: S - AttributeName: device_id AttributeType: S KeySchema: - AttributeName: user_id KeyType: HASH - AttributeName: device_id KeyType: RANGE BillingMode: PAY_PER_REQUEST DevicesRawDataBucket: Type: AWS::S3::Bucket Properties: BucketName: !Sub devices-raw-data-${AWS::AccountId}-${AWS::Region} DevicesDataAnalyticsBucket: Type: AWS::S3::Bucket Properties: BucketName: !Sub devices-data-analytics-${AWS::AccountId}-${AWS::Region} Outputs: DevicesRawDataBucketName: Description: S3 Bucket for devices raw data Value: !Ref DevicesRawDataBucket DevicesDataAnalyticsBucketName: Description: S3 Bucket for analytics data Value: !Ref DevicesDataAnalyticsBucket
- DynamoDBテーブル: Devices
- S3バケット(デバイスの時系列データ): devices-raw-data-${AWS::AccountId}-${AWS::Region}
- S3バケット(分析用データ): devices-data-analytics-${AWS::AccountId}-${AWS::Region}
slsコマンドでデプロイします。
$ sls deploy --config serverless-resources.yml
Lambda & Step Functions を作成
service: serverless-etl-sample useDotenv: true plugins: - serverless-python-requirements - serverless-pseudo-parameters - serverless-step-functions provider: name: aws region: ap-northeast-1 stage: ${opt:stage, self:custom.defaultStage} runtime: python3.8 stackName: ${self:custom.appName} apiName: ${self:custom.appName} lambdaHashingVersion: 20201221 deploymentBucket: name: ${cf:${self:custom.appName}-resources.ServerlessDeploymentBucketName} iam: role: statement: - Effect: Allow Action: - dynamodb:PutItem - dynamodb:DeleteItem - dynamodb:GetItem - dynamodb:Scan - dynamodb:Query - dynamodb:UpdateItem Resource: - arn:aws:dynamodb:#{AWS::Region}:#{AWS::AccountId}:table/Devices - Effect: Allow Action: - s3:GetObject - s3:headObject - s3:ListBucket - s3:PutObject Resource: - arn:aws:s3:::devices-raw-data-#{AWS::AccountId}-#{AWS::Region} - arn:aws:s3:::devices-raw-data-#{AWS::AccountId}-#{AWS::Region}/* - arn:aws:s3:::devices-data-analytics-#{AWS::AccountId}-#{AWS::Region} - arn:aws:s3:::devices-data-analytics-#{AWS::AccountId}-#{AWS::Region}/* environment: LOG_LEVEL: DEBUG POWERTOOLS_SERVICE_NAME: ${self:custom.appName} DEVICES_TABLE_NAME: Devices DEVICES_RAW_DATA_BUCKET_NAME: ${cf:${self:custom.appName}-resources.DevicesRawDataBucketName} DEVICES_DATA_ANALYTICS_BUCKET_NAME: ${cf:${self:custom.appName}-resources.DevicesDataAnalyticsBucketName} custom: appName: serverless-etl-sample defaultStage: dev pythonRequirements: dockerizePip: true slim: true usePipenv: true layer: true package: individually: true exclude: - .git/** - .venv/** - tests/** - README.md - pyrightconfig.json - package** - scripts/** - sample/** - Pipfile** - node_modules/** functions: LoadFile: name: load_file handler: src/handlers/load_file.handler description: "ファイルロード" layers: - !Ref PythonRequirementsLambdaLayer DataJoin: name: data_join handler: src/handlers/data_join.handler description: "データ結合" layers: - !Ref PythonRequirementsLambdaLayer stepFunctions: stateMachines: BatchStateMachine: name: BatchStateMachine definition: Comment: Load the raw data file StartAt: LoadFile States: LoadFile: Type: Task Resource: !GetAtt LoadFile.Arn Next: FileExists FileExists: Type: Choice Choices: - Variable: "$.file_exist" BooleanEquals: true Next: DataJoin - Variable: "$.file_exist" BooleanEquals: false Next: NoFile Default: DataJoin NoFile: Type: Pass End: true DataJoin: Type: Task Resource: !GetAtt DataJoin.Arn End: true
Step Functionsの定義
Step Functionsの定義にはslsのプラグイン Serverless Step Functions を利用しています。
各プロパティについてはプラグインのページを参照してください。
実際にデプロイされたフローは以下です。
LoadFile(Task) でS3にアクセスしてファイルの有無を確認、ファイルがあれば結合処理の DataJoin(Task) に、なければ NoFile(Pass) で処理を終了する、という流れです。 本来であればエラーが起きた際に通知処理を起動したり、エラーハンドリングのフローも含むかと思いますが、ここでは結合処理がメインのため省略して極力シンプルなフローにしています。
依存ライブラリをLayersでデプロイ
こちらもslsのプラグイン Serverless Python Requirements を使って依存ライブラリをLayersとしてデプロイしています。ライブラリ管理はPipenvを使っているので以下のように定義しています。
custom: pythonRequirements: usePipenv: true layer: true
Pandasのデプロイに注意
今回はデータの結合処理にPandasを使っていますが、Linux以外の環境でパッケージングするとLambda実行時に以下のようなImport Errorが発生します。
Original error was: No module named 'numpy.core._multiarray_umath'
serverless-python-requirementsのdockerizePipを有効化すると、LambdaのDockerイメージでビルド処理を行ってくれて上記のエラーを回避できます。 ついでにslimオプションを有効化してサイズを削減します。
custom: pythonRequirements: dockerizePip: true slim: true
【Lambda】 load_fileのコード
load_fileの中ではS3オブジェクトが存在するかどうか、のみを確認しています。オブジェクトがなければ、{'file_exist': False}を返却。オブジェクトがあれば後続のLambdaにバケット名・オブジェクトキーを渡します。
import os import boto3 import botocore from aws_lambda_powertools import Logger from aws_lambda_powertools.utilities.typing import LambdaContext logger = Logger() DEVICES_RAW_DATA_BUCKET_NAME = os.getenv( 'DEVICES_RAW_DATA_BUCKET_NAME') @logger.inject_lambda_context(log_event=True) def handler(event, context: LambdaContext): logger.info(event) s3 = boto3.resource('s3') key = 'devices-raw-data.json' # ファイルがあるか確認 try: logger.debug(DEVICES_RAW_DATA_BUCKET_NAME) s3.Object(DEVICES_RAW_DATA_BUCKET_NAME, key).load() except botocore.exceptions.ClientError as e: logger.warn(e) logger.info('File Not Exist.') return {'file_exist': False'} logger.info('File Exist.') return { 'file_exist': True, 'bucket_name': DEVICES_RAW_DATA_BUCKET_NAME, 's3_object_key': key}
Loggerの処理でaws_lambda_powertoolsを利用していますが、必須ではありません。 aws_lambda_powertoolsの使い方については以下の記事を参考にしてください。
【Lambda】 data_join
import json import os from datetime import datetime import boto3 import botocore import pandas as pd from aws_lambda_powertools import Logger from aws_lambda_powertools.utilities.typing import LambdaContext logger = Logger() DEVICES_DATA_ANALYTICS_BUCKET_NAME = os.getenv( 'DEVICES_DATA_ANALYTICS_BUCKET_NAME') @logger.inject_lambda_context(log_event=True) def handler(event, context: LambdaContext): logger.info(event) # S3からデータを取得 s3 = boto3.resource('s3') obj = s3.Bucket(event['bucket_name']).Object(event['s3_object_key']) response = obj.get() body = response['Body'].read().decode('utf-8') devices_raw_data = json.loads(body)['data'] logger.debug(devices_raw_data) # Dynamoからデータ取得 dynamodb = boto3.resource('dynamodb') TABLE_NAME = os.getenv('DEVICES_TABLE_NAME') table = dynamodb.Table(TABLE_NAME) response = table.scan() devices = response.get('Items', []) logger.debug(devices) # Pandasでデータ結合 df_devices_raw_data = pd.DataFrame(devices_raw_data) df_devices = pd.DataFrame(devices) devices_analysis_data = df_devices_raw_data.merge(df_devices) logger.debug(devices_analysis_data) # S3出力 csv_data = devices_analysis_data.to_csv(encoding='utf-8', index=None) today = datetime.now().strftime('%Y%m%d') key = f'devices_operation_history_data/{today}.csv' obj = s3.Bucket(DEVICES_DATA_ANALYTICS_BUCKET_NAME).Object(key) obj.put(Body=csv_data) return {'result': 'Success.'}
ちょっと詰め込んだ感がありますが、S3とDynamoDBからそれぞれデータ取得、Pandasで結合処理、分析用バケットにCSV出力まで実行します。
Pandasでデータ結合
デバイスの時系列データに対してデバイスのマスタデータを左結合しています。 処理結果は以下のようになります。Pandas便利ですね。
>>> df_devices_raw_data = pd.DataFrame(devices_raw_data) >>> df_devices = pd.DataFrame(devices) >>> df_devices_raw_data device_id timestamp power 0 device_A 1616506124 on 1 device_B 1616506165 on 2 device_A 1616506185 off 3 device_C 1616506197 off >>> df_devices device_id user_id type 0 device_A user_0001 TV 1 device_B user_0001 エアコン 2 device_C user_0002 TV >>> df_devices_raw_data.merge(df_devices) device_id timestamp power user_id type 0 device_A 1616506124 on user_0001 TV 1 device_A 1616506185 off user_0001 TV 2 device_B 1616506165 on user_0001 エアコン 3 device_C 1616506197 off user_0002 TV
結合結果をCSVで出力
CSVデータへの変換もPandasで実行しています。
csv_data = devices_analysis_data.to_csv(encoding='utf-8', index=None)
DataFrameのオブジェクトに対して、to_csv()を呼ぶだけでCSVデータへ変換できます。
S3への出力はboto3のResource APIを使っています。
obj = s3.Bucket(DEVICES_DATA_ANALYTICS_BUCKET_NAME).Object(key) obj.put(Body=csv_data)
疎通確認
サンプルデータの投入
上記のSLSテンプレートのデプロイ後、AWS CLIが設定されている状態でサンプルコードのディレクトリで以下のスクリプトを実行してください。
$ python scripts/set_sample_data.py
Devicesテーブルにデバイスデータが投入されます。
次に以下の時系列サンプルデータをS3バケットにアップロードします。
{ "data": [ {"device_id": "device_u33nk7yy", "timestamp": "1616506124", "power": "on"}, {"device_id": "device_m6mww3ss", "timestamp": "1616506165", "power": "on"}, {"device_id": "device_u33nk7yy", "timestamp": "1616506185", "power": "off"}, {"device_id": "device_sd7ubc8s", "timestamp": "1616506197", "power": "off"} ] }
$ aws s3 cp sample/devices-raw-data.json s3://devices-raw-data-999999999999-ap-northeast-1/
Step Functionsを実行
コンソールからでもOKですが、SLSのプラグインから実行する事も可能です。
$ sls invoke stepf --name BatchStateMachine { executionArn: 'arn:aws:states:ap-northeast-1:999999999999:execution:BatchStateMachine:83b464d9-e4ce-4172-8834-bc8340f5dc1e', stateMachineArn: 'arn:aws:states:ap-northeast-1:999999999999:stateMachine:BatchStateMachine', name: '83b464d9-e4ce-4172-8834-bc8340f5dc1e', status: 'SUCCEEDED', startDate: 2021-03-23T14:55:48.505Z, stopDate: 2021-03-23T14:55:53.472Z, input: '{}', inputDetails: { included: true }, output: '{"result": "Success."}', outputDetails: { included: true } }
出力されたCSVファイルを確認
実際に出力されたCSVは以下のようになっていました。時系列データにデバイスのユーザー情報などが含められていたので想定通りです。
device_id,timestamp,power,user_id,type device_u33nk7yy,1616506124,on,user_0001,TV device_u33nk7yy,1616506185,off,user_0001,TV device_m6mww3ss,1616506165,on,user_0001,エアコン device_sd7ubc8s,1616506197,off,user_0002,TV